How to Wite Spark DataFrame

The concept of writing data back to the source system is fundamentally same as the reading data. You need a writer, and hence we have a DataFrameWriter interface. For a typical writer, there are four methods.

  • format
  • option
  • mode
  • save

How to handle file already exist

However, while working with a file-based destination, you may encounter a file already exists situation. To handle that scenario, we have a mode method that takes one of the following options.

  • append
  • overwrite
  • errorIfExists
  • ignore
Spark to write the dataframe in snowflack Table  
df.write
    .format("snowflake")
    .options(sfOptions)
    .option("dbtable", "EMPLOYEE")
    .mode(SaveMode.Overwrite)
    .save()

Additional Spark Data Frame Write options

if you are working with file-based destination system, you also have three additional methods.

  • partitionBy
  • bucketBy
  • sortBy

The partitionBy method allows you to partition your output file. And the bucketBy method allows you to create a finite number of data buckets. Both methods follow the hive partitioning and bucketing principals. 

Write Data In Hive Partation Table 

var dbName="your database name"

var finaltable="your table name"

// First check if table is available or not..

if (spark.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {

     //If table is not available then it will create for you..

     println("Table Not Present \n  Creating table " + finaltable)

     spark.sql("use Database_Name")

     spark.sql("SET hive.exec.dynamic.partition = true")

     spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")

     spark.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")

     spark.sql("create table " + dbName +"." + finaltable + "(

                EMP_ID string,

                EMP_Name string,

                EMP_Address string,

                EMP_Salary bigint)  PARTITIONED BY (EMP_DEP STRING)")

     //Table is created now insert the DataFrame in append Mode

     df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)

Different way write the data into different file format 

    Read CSV -> Write Parquet
    Read Parquet -> Write JSON
    Read JSON -> Write ORC
    Read ORC -> Write XML
    Read XML -> Write AVRO
    Read AVRO -> Write CSV

//Read CSV into Data Frame
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .option("nullValue", "NA")
  .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
  .option("mode", "failfast")
  .load("/spark-data/mental-health-in-tech-survey/survey.csv")

//Write Data Frame to Parquet
df.write
  .format("parquet")
  .mode("overwrite")
  .save("/spark-data/mental-health-in-tech-survey/parquet-data/")

//Read Parquet into Data Frame
val df = spark.read
  .format("parquet")
  .option("mode", "failfast")
  .load("/spark-data/mental-health-in-tech-survey/parquet-data/")

//Write Data Frame to JSON
df.write
  .format("json")
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
  .mode("overwrite")
  .save("/spark-data/mental-health-in-tech-survey/json-data/")

//Read JSON into Data Frame
val df = spark.read
  .format("json")
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
  .option("mode", "failfast")
  .load("/home/prashant/spark-data/mental-health-in-tech-survey/json-data/")

//Write Data Frame to ORC
df.write
  .format("orc")
  .mode("overwrite")
  .save("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")


spark-shell --packages com.databricks:spark-xml_2.11:0.4.1,com.databricks:spark-avro_2.11:4.0.0
//Read ORC into Data Frame
val df = spark.read
  .format("orc")
  .option("mode", "failfast")
  .load("/home/prashant/spark-data/mental-health-in-tech-survey/orc-data/")

//Write Data Frame to XML
df.write
  .format("com.databricks.spark.xml")
  .option("rootTag", "survey")
  .option("rowTag", "survey-row")
  .mode("overwrite")
  .save("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")

//Read XML into Data Frame
val df = spark.read
  .format("com.databricks.spark.xml")
  .option("rowTag", "survey-row")
  .option("mode", "failfast")
  .load("/home/prashant/spark-data/mental-health-in-tech-survey/xml-data/")

//Write Data Frame to AVRO
df.write
  .format("com.databricks.spark.avro")
  .mode("overwrite")
  .save("/home/prashant/spark-data/mental-health-in-tech-survey/avro-data/")
 

No comments:

Post a Comment